Skip to content

Add generic script support for update/upsert operations in OpenSearch sink#6744

Merged
dinujoh merged 2 commits into
opensearch-project:mainfrom
dinujoh:feature/opensearch-sink-script-support
Apr 10, 2026
Merged

Add generic script support for update/upsert operations in OpenSearch sink#6744
dinujoh merged 2 commits into
opensearch-project:mainfrom
dinujoh:feature/opensearch-sink-script-support

Conversation

@dinujoh

@dinujoh dinujoh commented Apr 9, 2026

Copy link
Copy Markdown
Member

Description

Adds generic script support for update/upsert operations in the OpenSearch sink. When configured, the sink attaches an inline script (https://opensearch.org/docs/latest/api-reference/document-apis/update-document/#script-example) to bulk update/upsert requests.

Key behaviors:

  • The event document is automatically passed as params.doc
  • scripted_upsert is always true — the script runs on every write including first create
  • Script params support ${} expression syntax for dynamic values from event fields or metadata
    Configuration
opensearch:                                                                                                                                                                                                    
  hosts: ["https://localhost:9200"]                                                                                                                                                                            
  index: "my-index"                                                                                                                                                                                            
  action: "upsert"                                                                                                                                                                                             
  document_id: "${/id}"                                                                                                                                                                                        
  script:                                                                                                                                                                                                      
    source: "ctx._source.putAll(params.doc); ctx._source.write_count = (ctx._source.containsKey('write_count') ? ctx._source.write_count + 1 : 1)"                                                                                                                                                                                                                                                 
    params:                                                                                                                                                                                                    
      table: "${/source_table}"                                                                                                                                                                                

Examples

  1. Merge with write tracking
script:                                                                                                                                                                                                        
  source: "ctx._source.putAll(params.doc); ctx._source.write_count = (ctx._source.containsKey('write_count') ? ctx._source.write_count + 1 : 1)"                                                                                                                                                                                                                                                 

First write → {"id": 1, "name": "test", "write_count": 1}
Second write → {"id": 1, "name": "updated", "write_count": 2}
2. Conditional price update — only update if lower

  script:                                                                                                                                                                                                        
    source: "if (params.doc.price < ctx._source.price) { ctx._source.price = params.doc.price }"                                                                                                                 
  1. Version-gated full replace
  script:                                                                                                                                                                                                        
    source: "if (params.doc.version > ctx._source.version) { ctx._source = params.doc }"                                                                                                                                                                                                                                                                                                                  
  1. Multi-table denormalization with dynamic params
  script:                                                                                                                                                                                                        
    source: |                                                                                                                                                                                                    
      String[] fields = params.fields.splitOnToken(',');                                                                                                                                                         
      if (params.is_delete == 'true') {                                                                                                                                                                          
        for (String field : fields) { ctx._source.remove(field) }                                                                                                                                                
      } else {                                                                                                                                                                                                   
        for (String field : fields) {                                                                                                                                                                            
          if (params.doc.containsKey(field)) { ctx._source[field] = params.doc[field] }                                                                                                                          
        }                                                                                                                                                                                                        
      }                                                                                                                                                                                                                                                                                                                                                                                                   
    params:                                                                                                                                                                                                      
      table: "${/_table}"                                                                                                                                                                                        
      fields: "${/_fields}"                                                                                                                                                                                      
      is_delete: "${/_is_delete}"                                                                                                                                                                                

Events from products table:

 {"primary_key": "prod-1", "title": "Widget A", "_table": "products", "_fields": "title,description", "_is_delete": "false"}                                                                                    

Events from pricing table:

 {"primary_key": "prod-1", "price": 9.99, "_table": "pricing", "_fields": "price,currency", "_is_delete": "false"}                                                                                              

Result in OpenSearch:

  {"title": "Widget A", "price": 9.99}                                                                                                                                                                           

Delete pricing:

  {"primary_key": "prod-1", "_table": "pricing", "_fields": "price,currency", "_is_delete": "true"}                                                                                                              

Result:

  {"title": "Widget A"}    

Issues Resolved

Resolves #3563

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions

github-actions Bot commented Apr 9, 2026

Copy link
Copy Markdown

✅ License Header Check Passed

All newly added files have proper license headers. Great work! 🎉

@dinujoh dinujoh force-pushed the feature/opensearch-sink-script-support branch 5 times, most recently from 76a6246 to 78d49a6 Compare April 9, 2026 15:28

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dinujoh !

}

private BulkOperation getBulkOperationForAction(final String action,
BulkOperation getBulkOperationForAction(final String action,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than expose this to test using package private, what would be the effort to move this to its own class - say BulkOperationFactory?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will refactor BulkOperationFactory as a follow-up to keep this PR scoped.

… sink

Adds the ability to configure a script on the OpenSearch sink that gets
applied to update and upsert bulk operations. This is a generic mechanism
that passes script source and params through to OpenSearch's bulk API.
Script language is hardcoded to painless. The event document is
automatically passed as params.doc. scripted_upsert is always true
so the script runs on every write including the first create.
Script param values support ${} expression syntax for dynamic
resolution from event fields or metadata.

Configuration example:
  sink:
    - opensearch:
        hosts: ["https://localhost:9200"]
        index: "my-index"
        action: "upsert"
        document_id: "${/id}"
        script:
          source: "ctx._source.putAll(params.doc); ctx._source.source = params.table"
          params:
            table: "${getMetadata(\"table_name\")}"

Resolves opensearch-project#3563

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
@dinujoh dinujoh force-pushed the feature/opensearch-sink-script-support branch 3 times, most recently from b1b66a1 to 9eef437 Compare April 9, 2026 17:14
@dinujoh dinujoh requested a review from dlvenable April 9, 2026 17:30
dlvenable
dlvenable previously approved these changes Apr 9, 2026
… expectations

Remove unused imports (ScriptConfiguration, HashMap, Map) from
OpenSearchSink. Update expected compressed bulk request sizes in
integration tests to account for the additional resolvedScriptParameters
field in SerializedJsonImpl.

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
@dinujoh dinujoh force-pushed the feature/opensearch-sink-script-support branch from 917ecdd to 57a8d87 Compare April 9, 2026 18:56
@dinujoh dinujoh merged commit 3bd796d into opensearch-project:main Apr 10, 2026
97 of 101 checks passed
@dinujoh dinujoh deleted the feature/opensearch-sink-script-support branch April 10, 2026 03:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support conditional script update of documents in the OpenSearch sink

3 participants